package com.danan.data_loader.app;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.danan.data_loader.config.SourceConfig;
import com.danan.data_loader.config.TargetConfig;
import com.danan.data_loader.function.OracleBatchSinkFunction;
import com.danan.data_loader.function.OracleSinkFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: NanHuang
 * @Date: 2023/05/13/10:18
 * @Description:
 */
@Component
public class TestApp implements CommandLineRunner {
    @Resource
    private DataStreamSource<String> ds;
    @Resource
    private StreamExecutionEnvironment env;
    @Resource
    private SourceConfig sourceConfig;
    @Resource
    private TargetConfig targetConfig;


    @Override
    public void run(String... args) throws Exception {
//        ds.print();
//        1.单条数据处理
//        ds.addSink(new OracleSinkFunction(JSON.toJSONString(sourceConfig),JSON.toJSONString(targetConfig)));
//        2.批处理测试
        ds.addSink(new OracleBatchSinkFunction(JSON.toJSONString(sourceConfig),JSON.toJSONString(targetConfig)));


//        3.按照表分批
//        ds.map(JSON::parseObject)
//                        .keyBy(j -> j.getString("schema") + "." + j.getString("table"))
//                                .process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {
//                                    @Override
//                                    public void open(Configuration parameters) throws Exception {
//                                        super.open(parameters);
//                                    }
//
//                                    @Override
//                                    public void processElement(JSONObject value, KeyedProcessFunction<String, JSONObject, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
//
//                                    }
//                                });


        env.execute();
    }

}
